Skip to content

Commit

Permalink
Merge pull request #34 from openmessaging/prefered_leader
Browse files Browse the repository at this point in the history
Add preferred leader to Dledger
  • Loading branch information
duhenglucky committed Aug 27, 2019
2 parents 6682c6a + 7ab7fb6 commit 34abd28
Show file tree
Hide file tree
Showing 22 changed files with 617 additions and 101 deletions.
39 changes: 39 additions & 0 deletions src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ public class DLedgerConfig {

private boolean enablePushToFollower = true;

@Parameter(names = {"--preferred-leader-id"}, description = "Preferred LeaderId")
private String preferredLeaderId;
private long maxLeadershipTransferWaitIndex = 10000;
private int minTakeLeadershipVoteIntervalMs = 30;
private int maxTakeLeadershipVoteIntervalMs = 100;


public String getDefaultPath() {
return storeBaseDir + File.separator + "dledger-" + selfId;
}
Expand Down Expand Up @@ -331,4 +338,36 @@ public long getCheckPointInterval() {
public void setCheckPointInterval(long checkPointInterval) {
this.checkPointInterval = checkPointInterval;
}

public String getPreferredLeaderId() {
return preferredLeaderId;
}

public void setPreferredLeaderId(String preferredLeaderId) {
this.preferredLeaderId = preferredLeaderId;
}

public long getMaxLeadershipTransferWaitIndex() {
return maxLeadershipTransferWaitIndex;
}

public void setMaxLeadershipTransferWaitIndex(long maxLeadershipTransferWaitIndex) {
this.maxLeadershipTransferWaitIndex = maxLeadershipTransferWaitIndex;
}

public int getMinTakeLeadershipVoteIntervalMs() {
return minTakeLeadershipVoteIntervalMs;
}

public void setMinTakeLeadershipVoteIntervalMs(int minTakeLeadershipVoteIntervalMs) {
this.minTakeLeadershipVoteIntervalMs = minTakeLeadershipVoteIntervalMs;
}

public int getMaxTakeLeadershipVoteIntervalMs() {
return maxTakeLeadershipVoteIntervalMs;
}

public void setMaxTakeLeadershipVoteIntervalMs(int maxTakeLeadershipVoteIntervalMs) {
this.maxTakeLeadershipVoteIntervalMs = maxTakeLeadershipVoteIntervalMs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private void updatePeerWaterMark(long term, String peerId, long index) {
}
}

private long getPeerWaterMark(long term, String peerId) {
public long getPeerWaterMark(long term, String peerId) {
synchronized (peerWaterMarksByTerm) {
checkTermForWaterMark(term, "getPeerWaterMark");
return peerWaterMarksByTerm.get(term).get(peerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
import io.openmessaging.storage.dledger.protocol.HeartBeatRequest;
import io.openmessaging.storage.dledger.protocol.HeartBeatResponse;
import io.openmessaging.storage.dledger.protocol.LeadershipTransferRequest;
import io.openmessaging.storage.dledger.protocol.LeadershipTransferResponse;
import io.openmessaging.storage.dledger.protocol.VoteRequest;
import io.openmessaging.storage.dledger.protocol.VoteResponse;
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
Expand Down Expand Up @@ -66,6 +68,8 @@ public class DLedgerLeaderElector {

private StateMaintainer stateMaintainer = new StateMaintainer("StateMaintainer", logger);

private final TakeLeadershipTask takeLeadershipTask = new TakeLeadershipTask();

public DLedgerLeaderElector(DLedgerConfig dLedgerConfig, MemberState memberState, DLedgerRpcService dLedgerRpcService) {
this.dLedgerConfig = dLedgerConfig;
this.memberState = memberState;
Expand Down Expand Up @@ -226,6 +230,10 @@ public CompletableFuture<VoteResponse> handleVote(VoteRequest request, boolean s
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.getLedgerEndTerm()).voteResult(VoteResponse.RESULT.REJECT_TERM_SMALL_THAN_LEDGER));
}

if (!self && isTakingLeadership() && request.getLedgerEndTerm() == memberState.getLedgerEndTerm() && memberState.getLedgerEndIndex() >= request.getLedgerEndIndex()) {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_TAKING_LEADERSHIP));
}

memberState.setCurrVoteFor(request.getLeaderId());
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.ACCEPT));
}
Expand All @@ -252,7 +260,6 @@ private void sendHeartbeats(long term, String leaderId) throws Exception {
CompletableFuture<HeartBeatResponse> future = dLedgerRpcService.heartBeat(heartBeatRequest);
future.whenComplete((HeartBeatResponse x, Throwable ex) -> {
try {

if (ex != null) {
throw ex;
}
Expand Down Expand Up @@ -356,7 +363,16 @@ private List<CompletableFuture<VoteResponse>> voteForQuorumResponses(long term,
return responses;
}

private boolean isTakingLeadership() {
return memberState.getSelfId().equals(dLedgerConfig.getPreferredLeaderId())
|| memberState.getTermToTakeLeadership() == memberState.currTerm();
}

private long getNextTimeToRequestVote() {
if (isTakingLeadership()) {
return System.currentTimeMillis() + dLedgerConfig.getMinTakeLeadershipVoteIntervalMs() +
random.nextInt(dLedgerConfig.getMaxTakeLeadershipVoteIntervalMs() - dLedgerConfig.getMinTakeLeadershipVoteIntervalMs());
}
return System.currentTimeMillis() + lastVoteCost + minVoteIntervalMs + random.nextInt(maxVoteIntervalMs - minVoteIntervalMs);
}

Expand Down Expand Up @@ -416,6 +432,7 @@ private void maintainAsCandidate() throws Exception {
acceptedNum.incrementAndGet();
break;
case REJECT_ALREADY_VOTED:
case REJECT_TAKING_LEADERSHIP:
break;
case REJECT_ALREADY__HAS_LEADER:
alreadyHasLeader.compareAndSet(false, true);
Expand Down Expand Up @@ -444,7 +461,7 @@ private void maintainAsCandidate() throws Exception {
voteLatch.countDown();
}
} catch (Throwable t) {
logger.error("Get error when parsing vote response ", t);
logger.error("Get error when parsing vote response", t);
} finally {
allNum.incrementAndGet();
if (allNum.get() == memberState.peerSize()) {
Expand Down Expand Up @@ -496,9 +513,10 @@ private void maintainAsCandidate() throws Exception {
/**
* The core method of maintainer.
* Run the specified logic according to the current role:
* candidate => propose a vote.
* leader => send heartbeats to followers, and step down to candidate when quorum followers do not respond.
* follower => accept heartbeats, and change to candidate when no heartbeat from leader.
* candidate => propose a vote.
* leader => send heartbeats to followers, and step down to candidate when quorum followers do not respond.
* follower => accept heartbeats, and change to candidate when no heartbeat from leader.
*
* @throws Exception
*/
private void maintainState() throws Exception {
Expand All @@ -512,6 +530,12 @@ private void maintainState() throws Exception {
}

private void handleRoleChange(long term, MemberState.Role role) {
try {
takeLeadershipTask.check(term, role);
} catch (Throwable t) {
logger.error("takeLeadershipTask.check failed. ter={}, role={}", term, role, t);
}

for (RoleChangeHandler roleChangeHandler : roleChangeHandlers) {
try {
roleChangeHandler.handle(term, role);
Expand All @@ -527,6 +551,119 @@ public void addRoleChangeHandler(RoleChangeHandler roleChangeHandler) {
}
}

public CompletableFuture<LeadershipTransferResponse> handleLeadershipTransfer(LeadershipTransferRequest request) throws Exception {
logger.info("handleLeadershipTransfer: {}", request);
synchronized (memberState) {
if (memberState.currTerm() != request.getTerm()) {
logger.warn("[BUG] [HandleLeaderTransfer] currTerm={} != request.term={}", memberState.currTerm(), request.getTerm());
return CompletableFuture.completedFuture(new LeadershipTransferResponse().term(memberState.currTerm()).code(DLedgerResponseCode.INCONSISTENT_TERM.getCode()));
}

if (!memberState.isLeader()) {
logger.warn("[BUG] [HandleLeaderTransfer] selfId={} is not leader", request.getLeaderId());
return CompletableFuture.completedFuture(new LeadershipTransferResponse().term(memberState.currTerm()).code(DLedgerResponseCode.NOT_LEADER.getCode()));
}

if (memberState.getTransferee() != null) {
logger.warn("[BUG] [HandleLeaderTransfer] transferee={} is already set", memberState.getTransferee());
return CompletableFuture.completedFuture(new LeadershipTransferResponse().term(memberState.currTerm()).code(DLedgerResponseCode.LEADER_TRANSFERRING.getCode()));
}

memberState.setTransferee(request.getTransfereeId());
}
LeadershipTransferRequest takeLeadershipRequest = new LeadershipTransferRequest();
takeLeadershipRequest.setGroup(memberState.getGroup());
takeLeadershipRequest.setLeaderId(memberState.getLeaderId());
takeLeadershipRequest.setLocalId(memberState.getSelfId());
takeLeadershipRequest.setRemoteId(request.getTransfereeId());
takeLeadershipRequest.setTerm(request.getTerm());
takeLeadershipRequest.setTakeLeadershipLedgerIndex(memberState.getLedgerEndIndex());
takeLeadershipRequest.setTransferId(memberState.getSelfId());
takeLeadershipRequest.setTransfereeId(request.getTransfereeId());
if (memberState.currTerm() != request.getTerm()) {
logger.warn("[HandleLeaderTransfer] term changed, cur={} , request={}", memberState.currTerm(), request.getTerm());
return CompletableFuture.completedFuture(new LeadershipTransferResponse().term(memberState.currTerm()).code(DLedgerResponseCode.EXPIRED_TERM.getCode()));
}

return dLedgerRpcService.leadershipTransfer(takeLeadershipRequest).thenApply(response -> {
synchronized (memberState) {
if (memberState.currTerm() == request.getTerm() && memberState.getTransferee() != null) {
logger.warn("leadershipTransfer failed, set transferee to null");
memberState.setTransferee(null);
}
}
return response;
});
}

public CompletableFuture<LeadershipTransferResponse> handleTakeLeadership(LeadershipTransferRequest request) throws Exception {
logger.debug("handleTakeLeadership.request={}", request);
synchronized (memberState) {
if (memberState.currTerm() != request.getTerm()) {
logger.warn("[BUG] [handleTakeLeadership] currTerm={} != request.term={}", memberState.currTerm(), request.getTerm());
return CompletableFuture.completedFuture(new LeadershipTransferResponse().term(memberState.currTerm()).code(DLedgerResponseCode.INCONSISTENT_TERM.getCode()));
}

long targetTerm = request.getTerm() + 1;
memberState.setTermToTakeLeadership(targetTerm);
CompletableFuture<LeadershipTransferResponse> response = new CompletableFuture<>();
takeLeadershipTask.update(request, response);
changeRoleToCandidate(targetTerm);
needIncreaseTermImmediately = true;
return response;
}
}

private class TakeLeadershipTask {
private LeadershipTransferRequest request;
private CompletableFuture<LeadershipTransferResponse> responseFuture;

public synchronized void update(LeadershipTransferRequest request, CompletableFuture<LeadershipTransferResponse> responseFuture) {
this.request = request;
this.responseFuture = responseFuture;
}

public synchronized void check(long term, MemberState.Role role) {
logger.trace("TakeLeadershipTask called, term={}, role={}", term, role);
if (memberState.getTermToTakeLeadership() == -1 || responseFuture == null) {
return;
}
LeadershipTransferResponse response = null;
if (term > memberState.getTermToTakeLeadership()) {
response = new LeadershipTransferResponse().term(term).code(DLedgerResponseCode.EXPIRED_TERM.getCode());
} else if (term == memberState.getTermToTakeLeadership()) {
switch (role) {
case LEADER:
response = new LeadershipTransferResponse().term(term).code(DLedgerResponseCode.SUCCESS.getCode());
break;
case FOLLOWER:
response = new LeadershipTransferResponse().term(term).code(DLedgerResponseCode.TAKE_LEADERSHIP_FAILED.getCode());
break;
default:
return;
}
} else {
switch (role) {
/*
* The node may receive heartbeat before term increase as a candidate,
* then it will be follower and term < TermToTakeLeadership
*/
case FOLLOWER:
response = new LeadershipTransferResponse().term(term).code(DLedgerResponseCode.TAKE_LEADERSHIP_FAILED.getCode());
break;
default:
response = new LeadershipTransferResponse().term(term).code(DLedgerResponseCode.INTERNAL_ERROR.getCode());
}
}

responseFuture.complete(response);
logger.info("TakeLeadershipTask finished. request={}, response={}, term={}, role={}", request, response, term, role);
memberState.setTermToTakeLeadership(-1);
responseFuture = null;
request = null;
}
}

public interface RoleChangeHandler {
void handle(long term, MemberState.Role role);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.openmessaging.storage.dledger.protocol.GetEntriesResponse;
import io.openmessaging.storage.dledger.protocol.HeartBeatRequest;
import io.openmessaging.storage.dledger.protocol.HeartBeatResponse;
import io.openmessaging.storage.dledger.protocol.LeadershipTransferRequest;
import io.openmessaging.storage.dledger.protocol.LeadershipTransferResponse;
import io.openmessaging.storage.dledger.protocol.MetadataRequest;
import io.openmessaging.storage.dledger.protocol.MetadataResponse;
import io.openmessaging.storage.dledger.protocol.PullEntriesRequest;
Expand All @@ -36,11 +38,14 @@
import io.openmessaging.storage.dledger.protocol.RequestOrResponse;
import io.openmessaging.storage.dledger.protocol.VoteRequest;
import io.openmessaging.storage.dledger.protocol.VoteResponse;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

import io.openmessaging.storage.dledger.utils.DLedgerUtils;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
Expand Down Expand Up @@ -99,6 +104,7 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
this.remotingServer.registerProcessor(DLedgerRequestCode.PUSH.getCode(), protocolProcessor, null);
this.remotingServer.registerProcessor(DLedgerRequestCode.VOTE.getCode(), protocolProcessor, null);
this.remotingServer.registerProcessor(DLedgerRequestCode.HEART_BEAT.getCode(), protocolProcessor, null);
this.remotingServer.registerProcessor(DLedgerRequestCode.LEADERSHIP_TRANSFER.getCode(), protocolProcessor, null);

//start the remoting client
this.remotingClient = new NettyRemotingClient(new NettyClientConfig(), null);
Expand Down Expand Up @@ -201,8 +207,29 @@ private String getPeerAddr(RequestOrResponse request) {
return future;
}

@Override
public CompletableFuture<LeadershipTransferResponse> leadershipTransfer(LeadershipTransferRequest request) throws Exception {
CompletableFuture<LeadershipTransferResponse> future = new CompletableFuture<>();
try {
RemotingCommand wrapperRequest = RemotingCommand.createRequestCommand(DLedgerRequestCode.LEADERSHIP_TRANSFER.getCode(), null);
wrapperRequest.setBody(JSON.toJSONBytes(request));
remotingClient.invokeAsync(getPeerAddr(request), wrapperRequest, 3000, responseFuture -> {
LeadershipTransferResponse response = JSON.parseObject(responseFuture.getResponseCommand().getBody(), LeadershipTransferResponse.class);
future.complete(response);
});
} catch (Throwable t) {
logger.error("Send leadershipTransfer request failed {}", request.baseInfo(), t);
LeadershipTransferResponse response = new LeadershipTransferResponse();
response.copyBaseInfo(request);
response.setCode(DLedgerResponseCode.NETWORK_ERROR.getCode());
future.complete(response);
}

return future;
}

private void writeResponse(RequestOrResponse storeResp, Throwable t, RemotingCommand request,
ChannelHandlerContext ctx) {
ChannelHandlerContext ctx) {
RemotingCommand response = null;
try {
if (t != null) {
Expand All @@ -221,10 +248,10 @@ private void writeResponse(RequestOrResponse storeResp, Throwable t, RemotingCom
/**
* The core method to handle rpc requests.
* The advantages of using future instead of callback:
*
* <p>
* 1. separate the caller from actual executor, which make it able to handle the future results by the caller's wish
* 2. simplify the later execution method
*
* <p>
* CompletableFuture is an excellent choice, whenCompleteAsync will handle the response asynchronously.
* With an independent thread-pool, it will improve performance and reduce blocking points.
*
Expand Down Expand Up @@ -292,13 +319,29 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
}, futureExecutor);
break;
}
case LEADERSHIP_TRANSFER: {
long start = System.currentTimeMillis();
LeadershipTransferRequest leadershipTransferRequest = JSON.parseObject(request.getBody(), LeadershipTransferRequest.class);
CompletableFuture<LeadershipTransferResponse> future = handleLeadershipTransfer(leadershipTransferRequest);
future.whenCompleteAsync((x, y) -> {
writeResponse(x, y, request, ctx);
logger.info("LEADERSHIP_TRANSFER FINISHED. Request={}, response={}, cost={}ms",
request, x, DLedgerUtils.elapsed(start));
}, futureExecutor);
break;
}
default:
logger.error("Unknown request code {} from {}", request.getCode(), request);
break;
}
return null;
}

@Override
public CompletableFuture<LeadershipTransferResponse> handleLeadershipTransfer(LeadershipTransferRequest leadershipTransferRequest) throws Exception {
return dLedgerServer.handleLeadershipTransfer(leadershipTransferRequest);
}

@Override
public CompletableFuture<HeartBeatResponse> handleHeartBeat(HeartBeatRequest request) throws Exception {
return dLedgerServer.handleHeartBeat(request);
Expand Down
Loading

0 comments on commit 34abd28

Please sign in to comment.