Skip to content

Commit

Permalink
[ISSUE apache#6346] Support asynchronously notify brokers when their …
Browse files Browse the repository at this point in the history
…roles has been changed (apache#6348)

* feat(controller): support asynchronous notify brokers when roles changed

1. support asynchronous notify brokers when roles changed

* refactor(controller): move creating NotifyService instance logic to ControllerManager's constructor method

1. move creating NotifyService instance logic to ControllerManager's
constructor method
  • Loading branch information
TheR1sing3un authored and miles-ton committed Mar 16, 2023
1 parent 2e845de commit ed40c28
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ private boolean startBasicService() {
// The scheduled task for heartbeat sending is not starting now, so we should manually send heartbeat request
this.sendHeartbeatToController();
if (this.masterBrokerId != null || brokerElect()) {
LOGGER.info("Master in this broker set is elected, masterBrokerId: {}, masterBrokerAddr: {}", this.masterAddress, this.masterBrokerId);
LOGGER.info("Master in this broker set is elected, masterBrokerId: {}, masterBrokerAddr: {}", this.masterBrokerId, this.masterAddress);
this.state = State.RUNNING;
this.brokerController.setIsolated(false);
LOGGER.info("All register process has been done, change state to: {}", this.state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@
package org.apache.rocketmq.controller;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.future.FutureTaskExt;
Expand Down Expand Up @@ -67,6 +72,8 @@ public class ControllerManager {
private ExecutorService controllerRequestExecutor;
private BlockingQueue<Runnable> controllerRequestThreadPoolQueue;

private NotifyService notifyService;

public ControllerManager(ControllerConfig controllerConfig, NettyServerConfig nettyServerConfig,
NettyClientConfig nettyClientConfig) {
this.controllerConfig = controllerConfig;
Expand All @@ -77,6 +84,7 @@ public ControllerManager(ControllerConfig controllerConfig, NettyServerConfig ne
this.configuration.setStorePathFromConfig(this.controllerConfig, "configStorePath");
this.remotingClient = new NettyRemotingClient(nettyClientConfig);
this.heartbeatManager = new DefaultBrokerHeartbeatManager(this.controllerConfig);
this.notifyService = new NotifyService();
}

public boolean initialize() {
Expand All @@ -93,6 +101,7 @@ protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T valu
return new FutureTaskExt<T>(runnable, value);
}
};
this.notifyService.initialize();
if (StringUtils.isEmpty(this.controllerConfig.getControllerDLegerPeers())) {
throw new IllegalArgumentException("Attribute value controllerDLegerPeers of ControllerConfig is null or empty");
}
Expand Down Expand Up @@ -164,7 +173,7 @@ public void notifyBrokerRoleChanged(final RoleChangeNotifyEntry entry) {
// Inform all active brokers
final Map<Long, String> brokerAddrs = memberGroup.getBrokerAddrs();
brokerAddrs.entrySet().stream().filter(x -> this.heartbeatManager.isBrokerActive(clusterName, brokerName, x.getKey()))
.forEach(x -> doNotifyBrokerRoleChanged(x.getValue(), entry));
.forEach(x -> this.notifyService.notifyBroker(x.getValue(), entry));
}
}

Expand Down Expand Up @@ -214,6 +223,7 @@ public void start() {
public void shutdown() {
this.heartbeatManager.shutdown();
this.controllerRequestExecutor.shutdown();
this.notifyService.shutdown();
this.controller.shutdown();
this.remotingClient.shutdown();
}
Expand Down Expand Up @@ -245,4 +255,77 @@ public BrokerHousekeepingService getBrokerHousekeepingService() {
public Configuration getConfiguration() {
return configuration;
}

class NotifyService {
private ExecutorService executorService;

private Map<String/*brokerAddress*/, NotifyTask/*currentNotifyTask*/> currentNotifyFutures;

public NotifyService() {
}

public void initialize() {
this.executorService = Executors.newFixedThreadPool(3, new ThreadFactoryImpl("ControllerManager_NotifyService_"));
this.currentNotifyFutures = new ConcurrentHashMap<>();
}

public void notifyBroker(String brokerAddress, RoleChangeNotifyEntry entry) {
int masterEpoch = entry.getMasterEpoch();
NotifyTask oldTask = this.currentNotifyFutures.get(brokerAddress);
if (oldTask != null && masterEpoch > oldTask.getMasterEpoch()) {
// cancel current future
Future oldFuture = oldTask.getFuture();
if (oldFuture != null && !oldFuture.isDone()) {
oldFuture.cancel(true);
}
}
final NotifyTask task = new NotifyTask(masterEpoch, null);
Runnable runnable = () -> {
doNotifyBrokerRoleChanged(brokerAddress, entry);
this.currentNotifyFutures.remove(brokerAddress, task);
};
this.currentNotifyFutures.put(brokerAddress, task);
Future<?> future = this.executorService.submit(runnable);
task.setFuture(future);
}

public void shutdown() {
if (!this.executorService.isShutdown()) {
this.executorService.shutdownNow();
}
}

class NotifyTask extends Pair<Integer/*epochMaster*/, Future/*notifyFuture*/> {
public NotifyTask(Integer masterEpoch, Future future) {
super(masterEpoch, future);
}

public Integer getMasterEpoch() {
return super.getObject1();
}

public Future getFuture() {
return super.getObject2();
}

public void setFuture(Future future) {
super.setObject2(future);
}

@Override
public int hashCode() {
return Objects.hashCode(super.getObject1());
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (!(obj instanceof NotifyTask)) {
return false;
}
NotifyTask task = (NotifyTask) obj;
return super.getObject1().equals(task.getObject1());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.controller.impl.controller;
package org.apache.rocketmq.controller;

import java.io.File;
import java.time.Duration;
Expand All @@ -26,7 +26,6 @@
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.controller.ControllerManager;
import org.apache.rocketmq.controller.impl.DLedgerController;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.rocketmq.controller.impl.controller;
package org.apache.rocketmq.controller;

public class ControllerTestBase {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.controller.impl.controller.impl;
package org.apache.rocketmq.controller.impl;

import io.openmessaging.storage.dledger.DLedgerConfig;
import java.io.File;
Expand All @@ -31,7 +31,6 @@
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.controller.Controller;
import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
import org.apache.rocketmq.controller.impl.DLedgerController;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
Expand All @@ -49,9 +48,9 @@
import org.junit.Before;
import org.junit.Test;

import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_BROKER_NAME;
import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_CLUSTER_NAME;
import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_IP;
import static org.apache.rocketmq.controller.ControllerTestBase.DEFAULT_BROKER_NAME;
import static org.apache.rocketmq.controller.ControllerTestBase.DEFAULT_CLUSTER_NAME;
import static org.apache.rocketmq.controller.ControllerTestBase.DEFAULT_IP;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.controller.impl.controller.impl;
package org.apache.rocketmq.controller.impl;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.controller.impl.controller.impl.manager;
package org.apache.rocketmq.controller.impl.manager;

import java.util.Arrays;
import java.util.HashSet;
Expand All @@ -29,7 +29,6 @@
import org.apache.rocketmq.controller.impl.event.ControllerResult;
import org.apache.rocketmq.controller.impl.event.ElectMasterEvent;
import org.apache.rocketmq.controller.impl.event.EventMessage;
import org.apache.rocketmq.controller.impl.manager.ReplicasInfoManager;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
Expand All @@ -51,9 +50,9 @@
import org.junit.Before;
import org.junit.Test;

import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_BROKER_NAME;
import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_CLUSTER_NAME;
import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_IP;
import static org.apache.rocketmq.controller.ControllerTestBase.DEFAULT_BROKER_NAME;
import static org.apache.rocketmq.controller.ControllerTestBase.DEFAULT_CLUSTER_NAME;
import static org.apache.rocketmq.controller.ControllerTestBase.DEFAULT_IP;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down

0 comments on commit ed40c28

Please sign in to comment.