From b6b2136810cf63ce59c37efeca8bfb2e5ccf48b8 Mon Sep 17 00:00:00 2001 From: llIlll <10194588+llIlll@users.noreply.github.com> Date: Mon, 28 Dec 2020 15:40:37 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96raft=E7=9B=B8=E5=85=B3=20(#32?= =?UTF-8?q?6)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 优化故障时心跳处理 * 复制消费位置改为raft判断 * 修复commitPosition判断问题 * transport添加快速失败,避免阻塞线程 * 添加复制任务补偿 * 补充日志 --- MAINTAINERS.md | 6 --- .../main/java/org/joyqueue/domain/Topic.java | 11 ++++ .../network/transport/RequestBarrier.java | 8 +++ .../transport/codec/DefaultEncoder.java | 12 ----- .../transport/codec/JoyQueueHeader.java | 3 +- .../transport/config/TransportConfig.java | 10 ++++ .../config/TransportConfigSupport.java | 3 ++ .../support/DefaultChannelTransport.java | 28 ++++++++-- .../broker/election/ElectionConfig.java | 8 +++ .../broker/election/ElectionConfigKey.java | 2 + .../broker/election/RaftLeaderElection.java | 28 ++++++---- .../command/ReplicateConsumePosRequest.java | 35 ++++++++++-- .../ReplicateConsumePosRequestHandler.java | 53 +++++++++++++++---- .../ReplicateConsumePosRequestDecoder.java | 36 +++++++++---- .../ReplicateConsumePosRequestEncoder.java | 9 +++- .../manage/service/ElectionManageService.java | 9 ++++ .../support/DefaultBrokerManageService.java | 5 ++ .../support/DefaultElectionManageService.java | 12 +++++ .../broker/replication/ReplicaGroup.java | 38 +++++++++---- .../replication/ReplicationManager.java | 1 + .../src/main/resources/manage/routing.xml | 2 + .../election/ElectionCommandCodecTest.java | 8 +-- pom.xml | 10 ---- 23 files changed, 255 insertions(+), 82 deletions(-) delete mode 100644 MAINTAINERS.md diff --git a/MAINTAINERS.md b/MAINTAINERS.md deleted file mode 100644 index 5c83d5c71..000000000 --- a/MAINTAINERS.md +++ /dev/null @@ -1,6 +0,0 @@ - - -The JoqQueue maintainers are: - -* Li yue([liyue2008](https://github.com/liyue2008)) liyue2008@gmail.com -* Wang jin([rudy2steiner](https://github.com/rudy2steiner)) rudy_steiner@163.com \ No newline at end of file diff --git a/joyqueue-common/joyqueue-model/src/main/java/org/joyqueue/domain/Topic.java b/joyqueue-common/joyqueue-model/src/main/java/org/joyqueue/domain/Topic.java index 66965d08c..cf63a8b22 100644 --- a/joyqueue-common/joyqueue-model/src/main/java/org/joyqueue/domain/Topic.java +++ b/joyqueue-common/joyqueue-model/src/main/java/org/joyqueue/domain/Topic.java @@ -107,6 +107,17 @@ public int hashCode() { return Objects.hash(name, partitions, type, priorityPartitions, policy); } + @Override + public String toString() { + return "Topic{" + + "name=" + name + + ", partitions=" + partitions + + ", type=" + type + + ", priorityPartitions=" + priorityPartitions + + ", policy=" + policy + + '}'; + } + public static class TopicPolicy implements Serializable { private Long storeMaxTime; private Boolean storeCleanKeepUnconsumed; diff --git a/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/transport/RequestBarrier.java b/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/transport/RequestBarrier.java index f6ccf8245..4d35f1be8 100644 --- a/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/transport/RequestBarrier.java +++ b/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/transport/RequestBarrier.java @@ -201,6 +201,14 @@ public void acquire(final SemaphoreType type, final long timeout) throws Transpo } } + public boolean tryAcquire(final SemaphoreType type) throws TransportException { + if (type == null) { + return false; + } + Semaphore semaphore = type == SemaphoreType.ASYNC ? asyncSemaphore : onewaySemaphore; + return semaphore.tryAcquire(); + } + /** * 释放信号量 * diff --git a/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/transport/codec/DefaultEncoder.java b/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/transport/codec/DefaultEncoder.java index 6917f34c0..dc52c2153 100644 --- a/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/transport/codec/DefaultEncoder.java +++ b/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/transport/codec/DefaultEncoder.java @@ -66,19 +66,7 @@ public void encode(Object obj, ByteBuf buffer) throws TransportException.CodecEx if (payload instanceof JoyQueuePayload) { ((JoyQueuePayload) payload).setHeader(header); } - - int oldVersion = header.getVersion(); - if (payload.getClass().getName().equals("org.joyqueue.nsr.network.command.CreatePartitionGroup") - || payload.getClass().getName().equals("org.joyqueue.nsr.network.command.RemovePartitionGroup") - || payload.getClass().getName().equals("org.joyqueue.nsr.network.command.UpdatePartitionGroup")) { - header.setVersion(JoyQueueHeader.VERSION_V2); - oldVersion = JoyQueueHeader.VERSION_V2; - } else { - header.setVersion(JoyQueueHeader.CURRENT_VERSION); - } headerCodec.encode(header, buffer); - - header.setVersion(oldVersion); encoder.encode((Payload) payload, buffer); } else { header.setVersion(JoyQueueHeader.CURRENT_VERSION); diff --git a/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/transport/codec/JoyQueueHeader.java b/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/transport/codec/JoyQueueHeader.java index 62553c205..3ec2c08ea 100644 --- a/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/transport/codec/JoyQueueHeader.java +++ b/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/transport/codec/JoyQueueHeader.java @@ -34,8 +34,9 @@ public class JoyQueueHeader implements Header { public static final byte VERSION_V1 = 1; public static final byte VERSION_V2 = 2; public static final byte VERSION_V3 = 3; + public static final byte VERSION_V4 = 4; - public static final byte CURRENT_VERSION = VERSION_V3; + public static final byte CURRENT_VERSION = VERSION_V4; public static final int MAGIC = 0xCAFEBEBE; diff --git a/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/transport/config/TransportConfig.java b/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/transport/config/TransportConfig.java index d37cfeef6..612e6427b 100644 --- a/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/transport/config/TransportConfig.java +++ b/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/transport/config/TransportConfig.java @@ -60,6 +60,8 @@ public class TransportConfig { private boolean nonBlockOneway = false; // 非阻塞异步 private boolean nonBlockAsync = false; + // 快速失败 + private boolean fastfailAsync = false; // 最大异步请求数 private int maxAsync = 10240; // 异步回调线程数量 @@ -281,6 +283,14 @@ public boolean isNonBlockOneway() { return nonBlockOneway; } + public boolean isFastfailAsync() { + return fastfailAsync; + } + + public void setFastfailAsync(boolean fastfailAsync) { + this.fastfailAsync = fastfailAsync; + } + public void setNonBlockAsync(boolean nonBlockAsync) { this.nonBlockAsync = nonBlockAsync; } diff --git a/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/transport/config/TransportConfigSupport.java b/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/transport/config/TransportConfigSupport.java index cfe7d926d..1548e408a 100644 --- a/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/transport/config/TransportConfigSupport.java +++ b/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/transport/config/TransportConfigSupport.java @@ -98,6 +98,8 @@ public static final TransportConfig buildTransportConfig(final PropertySupplier transportConfig.setNonBlockOneway(property.getBoolean()); } else if (fullKey.equals(keyPrefix + TRANSPORT_CALL_NON_BLOCK_ASYNC)) { transportConfig.setNonBlockAsync(property.getBoolean()); + } else if (fullKey.equals(keyPrefix + TRANSPORT_CALL_FAST_FAIL_ASYNC)) { + transportConfig.setFastfailAsync(property.getBoolean()); } else if (fullKey.equals(keyPrefix + TRANSPORT_SEND_TIMEOUT)) { transportConfig.setSendTimeout(property.getInteger()); } else if (fullKey.equals(keyPrefix + TRANSPORT_RETRY_DELAY)) { @@ -132,6 +134,7 @@ public static final TransportConfig buildTransportConfig(final PropertySupplier public static final String TRANSPORT_CALL_BACK_THREADS = "transport.callbackThreads"; public static final String TRANSPORT_CALL_NON_BLOCK_ONEWAY = "transport.nonBlockOneway"; public static final String TRANSPORT_CALL_NON_BLOCK_ASYNC = "transport.nonBlockAsync"; + public static final String TRANSPORT_CALL_FAST_FAIL_ASYNC = "transport.fastfailAsync"; public static final String TRANSPORT_SEND_TIMEOUT = "transport.sendTimeout"; public static final String TRANSPORT_RETRY_DELAY = "transport.retryDelay"; public static final String TRANSPORT_RETRY_MAX = "transport.retryMax"; diff --git a/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/transport/support/DefaultChannelTransport.java b/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/transport/support/DefaultChannelTransport.java index 950da8329..9949fb08b 100644 --- a/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/transport/support/DefaultChannelTransport.java +++ b/joyqueue-common/joyqueue-network/src/main/java/org/joyqueue/network/transport/support/DefaultChannelTransport.java @@ -15,6 +15,9 @@ */ package org.joyqueue.network.transport.support; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import org.joyqueue.domain.QosLevel; import org.joyqueue.network.transport.ChannelTransport; import org.joyqueue.network.transport.RequestBarrier; @@ -30,9 +33,6 @@ import org.joyqueue.network.transport.exception.TransportException; import org.joyqueue.toolkit.network.IpUtil; import org.joyqueue.toolkit.time.SystemClock; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -151,7 +151,19 @@ public void async(final Command command, final long timeout, CommandCallback cal long sendTimeout = timeout <= 0 ? barrier.getSendTimeout() : timeout; // 获取信号量 if (!config.isNonBlockAsync()) { - barrier.acquire(RequestBarrier.SemaphoreType.ASYNC, sendTimeout); + if (config.isFastfailAsync()) { + if (!barrier.tryAcquire(RequestBarrier.SemaphoreType.ASYNC)) { + callback.onException(command, TransportException.RequestExcessiveException.build()); + return; + } + } else { + try { + barrier.acquire(RequestBarrier.SemaphoreType.ASYNC, timeout); + } catch (Exception e) { + callback.onException(command, e); + return; + } + } } try { @@ -217,7 +229,13 @@ public void oneway(final Command command, final long timeout) throws TransportEx long time = SystemClock.now(); // 获取信号量 if (!config.isNonBlockOneway()) { - barrier.acquire(RequestBarrier.SemaphoreType.ONEWAY, sendTimeout); + if (config.isFastfailAsync()) { + if (!barrier.tryAcquire(RequestBarrier.SemaphoreType.ONEWAY)) { + throw TransportException.RequestExcessiveException.build(); + } + } else { + barrier.acquire(RequestBarrier.SemaphoreType.ONEWAY, timeout); + } } try { diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/election/ElectionConfig.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/election/ElectionConfig.java index 5ddb351d8..eb4cce53e 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/election/ElectionConfig.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/election/ElectionConfig.java @@ -81,6 +81,10 @@ public int getHeartbeatTimeout() { return PropertySupplier.getValue(propertySupplier, ElectionConfigKey.HEARTBEAT_TIMEOUT); } + public int getHeartbeatMaxTimeout() { + return PropertySupplier.getValue(propertySupplier, ElectionConfigKey.HEARTBEAT_MAX_TIMEOUT); + } + public int getSendCommandTimeout() { return PropertySupplier.getValue(propertySupplier, ElectionConfigKey.SEND_COMMAND_TIMEOUT); } @@ -161,6 +165,10 @@ public boolean enableSharedHeartbeat() { return PropertySupplier.getValue(propertySupplier, ElectionConfigKey.ENABLE_SHARED_HEARTBEAT); } + public boolean enableReplicatePositionV3Protocol() { + return PropertySupplier.getValue(propertySupplier, ElectionConfigKey.ENABLE_REPLICATE_POSITION_V3_PROTOCOL); + } + public void setListenPort(String port) { listenPort = Integer.valueOf(port); } diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/election/ElectionConfigKey.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/election/ElectionConfigKey.java index 951386c71..4e158baed 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/election/ElectionConfigKey.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/election/ElectionConfigKey.java @@ -30,6 +30,7 @@ public enum ElectionConfigKey implements PropertyDef { EXECUTOR_THREAD_NUM_MAX("election.executor.thread.num.max", 50, Type.INT), TIMER_SCHEDULE_THREAD_NUM("election.timer.schedule.thread.num", 10, Type.INT), HEARTBEAT_TIMEOUT("election.heartbeat.timeout", 1000, Type.INT), + HEARTBEAT_MAX_TIMEOUT("election.heartbeat.max.timeout", 1000 * 30, Type.INT), SEND_COMMAND_TIMEOUT("election.send.command.timeout", 1000 * 5, Type.INT), MAX_BATCH_REPLICATE_SIZE("election.max.replicate.length", 1024 * 1024 * 3, Type.INT), DISABLE_STORE_TIMEOUT("election.disable.store.timeout", 1000 * 5, Type.INT), @@ -50,6 +51,7 @@ public enum ElectionConfigKey implements PropertyDef { CONNECTION_TIMEOUT("election.connection.timeout", 100 * 1, Type.INT), CONNECTION_RETRY_DELAY("election.connection.retryDelay", 1000 * 10, Type.INT), ENABLE_SHARED_HEARTBEAT("election.enable.shared.heartbeat", false, Type.BOOLEAN), + ENABLE_REPLICATE_POSITION_V3_PROTOCOL("election.enable.replicate.position.v3.protocol", false, Type.BOOLEAN), ; diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/election/RaftLeaderElection.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/election/RaftLeaderElection.java index 15dca88b6..1269d57d5 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/election/RaftLeaderElection.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/election/RaftLeaderElection.java @@ -23,6 +23,7 @@ import org.joyqueue.broker.election.command.TimeoutNowResponse; import org.joyqueue.broker.election.command.VoteRequest; import org.joyqueue.broker.election.command.VoteResponse; +import org.joyqueue.broker.replication.Replica; import org.joyqueue.broker.replication.ReplicaGroup; import org.joyqueue.domain.PartitionGroup; import org.joyqueue.domain.TopicConfig; @@ -153,7 +154,7 @@ protected void doStop() { super.doStop(); - logger.info("Raft leader election of partition group {}/node {} stoped", + logger.info("Raft leader election of partition group {}/node {} stopped", topicPartitionGroup, localNode); } @@ -260,6 +261,10 @@ public void setLeaderId(int leaderId) throws Exception { } } + public int getCurrentTerm() { + return currentTerm; + } + /** * 切换状态 * @param state 切换到的状态 @@ -784,7 +789,7 @@ public synchronized Command handleAppendEntriesRequest(AppendEntriesRequest requ if (request.getTerm() < currentTerm) { logger.info("Partition group {}/node {} receive append entries request from {}, current term {} " + "is bigger than request term {}, length is {}", - topicPartitionGroup, localNode, currentTerm, request.getLeaderId(), + topicPartitionGroup, localNode, request.getLeaderId(), currentTerm, request.getTerm(), request.getEntriesLength()); return new Command(new JoyQueueHeader(Direction.RESPONSE, CommandType.RAFT_APPEND_ENTRIES_RESPONSE), new AppendEntriesResponse.Build().success(false).term(currentTerm) @@ -807,14 +812,6 @@ public synchronized Command handleAppendEntriesRequest(AppendEntriesRequest requ } } - private synchronized void maybeStartNewHeartbeat() { - if (electionConfig.enableSharedHeartbeat()) { - startNewHeartbeat(); - } else { - resetHeartbeatTimer(); - } - } - /** * 开始新一轮心跳,向Follower节点发送心跳命令,重置心跳定时器 */ @@ -835,6 +832,15 @@ private synchronized void startNewHeartbeat() { if (node.equals(localNode)) { continue; } + + if (!electionConfig.enableSharedHeartbeat()) { + Replica replica = replicaGroup.getReplica(node.getNodeId()); + if (replica != null && replica.getLastAppendTime() != 0 + && SystemClock.now() - replica.getLastAppendTime() > electionConfig.getHeartbeatMaxTimeout()) { + continue; + } + } + try { electionExecutor.submit(() -> { JoyQueueHeader header = new JoyQueueHeader(Direction.REQUEST, CommandType.RAFT_APPEND_ENTRIES_REQUEST); @@ -927,7 +933,7 @@ private synchronized void resetHeartbeatTimer() { heartbeatTimerFuture.cancel(true); heartbeatTimerFuture = null; } - heartbeatTimerFuture = electionTimerExecutor.schedule(this::maybeStartNewHeartbeat, + heartbeatTimerFuture = electionTimerExecutor.schedule(this::startNewHeartbeat, electionConfig.getHeartbeatTimeout(), TimeUnit.MILLISECONDS); } diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/election/command/ReplicateConsumePosRequest.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/election/command/ReplicateConsumePosRequest.java index 5ba0492c0..fe95d63e4 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/election/command/ReplicateConsumePosRequest.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/election/command/ReplicateConsumePosRequest.java @@ -29,6 +29,10 @@ */ public class ReplicateConsumePosRequest extends JoyQueuePayload { private Map consumePositions; + private int term; + private int leaderId; + private String topic; + private int group; public Map getConsumePositions() { return consumePositions; @@ -38,11 +42,36 @@ public void setConsumePositions(Map consumePositions this.consumePositions = consumePositions; } - public ReplicateConsumePosRequest(Map consumePositions) { - this.consumePositions = consumePositions; + public int getTerm() { + return term; + } + + public void setTerm(int term) { + this.term = term; + } + + public int getLeaderId() { + return leaderId; + } + + public void setLeaderId(int leaderId) { + this.leaderId = leaderId; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public int getGroup() { + return group; } - public ReplicateConsumePosRequest() { + public void setGroup(int group) { + this.group = group; } @Override diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/election/handler/ReplicateConsumePosRequestHandler.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/election/handler/ReplicateConsumePosRequestHandler.java index 04b115298..eb9646a97 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/election/handler/ReplicateConsumePosRequestHandler.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/election/handler/ReplicateConsumePosRequestHandler.java @@ -18,11 +18,13 @@ import com.google.common.base.Preconditions; import org.joyqueue.broker.BrokerContext; import org.joyqueue.broker.consumer.Consume; -import org.joyqueue.broker.consumer.model.ConsumePartition; -import org.joyqueue.broker.consumer.position.model.Position; import org.joyqueue.broker.election.ElectionConfig; +import org.joyqueue.broker.election.ElectionService; +import org.joyqueue.broker.election.LeaderElection; +import org.joyqueue.broker.election.RaftLeaderElection; import org.joyqueue.broker.election.command.ReplicateConsumePosRequest; import org.joyqueue.broker.election.command.ReplicateConsumePosResponse; +import org.joyqueue.domain.TopicName; import org.joyqueue.network.command.CommandType; import org.joyqueue.network.transport.Transport; import org.joyqueue.network.transport.codec.JoyQueueHeader; @@ -34,8 +36,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; - /** * author: zhuduohui * email: zhuduohui@jd.com @@ -46,6 +46,7 @@ public class ReplicateConsumePosRequestHandler implements CommandHandler, Type { private Consume consume; private ElectionConfig electionConfig; + private ElectionService electionService; public ReplicateConsumePosRequestHandler(ElectionConfig electionConfig, Consume consume) { Preconditions.checkArgument(consume != null, "consume is null"); @@ -60,6 +61,7 @@ public ReplicateConsumePosRequestHandler(BrokerContext brokerContext) { this.consume = brokerContext.getConsume(); this.electionConfig = new ElectionConfig(brokerContext.getPropertySupplier()); + this.electionService = brokerContext.getElectionService(); } @Override @@ -81,18 +83,47 @@ public Command handle(Transport transport, Command command) throws TransportExce logger.info("Receive consume pos request {}", request.getConsumePositions()); } - try { - Map consumePositions = request.getConsumePositions(); - consume.setConsumePosition(consumePositions); - response.setSuccess(true); - } catch (Exception e) { - logger.warn("Set consume info {} fail", request.getConsumePositions(), e); - response.setSuccess(false); + if (request.getHeader().getVersion() >= JoyQueueHeader.VERSION_V4) { + handleV4Protocol(request, response); + } else { + try { + consume.setConsumePosition(request.getConsumePositions()); + response.setSuccess(true); + } catch (Exception e) { + logger.warn("Set consume info {} fail", request.getConsumePositions(), e); + response.setSuccess(false); + } } return new Command(header, response); } + protected void handleV4Protocol(ReplicateConsumePosRequest request, ReplicateConsumePosResponse response) { + LeaderElection leaderElection = electionService.getLeaderElection(TopicName.parse(request.getTopic()), request.getGroup()); + if (leaderElection == null) { + logger.warn("Set consume info fail, election is null, topic: {}, group: {}, term: {}, leaderId: {}", + request.getTopic(), request.getGroup(), request.getTerm(), request.getLeaderId()); + response.setSuccess(false); + return; + } + + if (!(leaderElection instanceof RaftLeaderElection)) { + consume.setConsumePosition(request.getConsumePositions()); + response.setSuccess(true); + return; + } + + RaftLeaderElection raftLeaderElection = (RaftLeaderElection) leaderElection; + if (raftLeaderElection.getCurrentTerm() == request.getTerm() && raftLeaderElection.getLeaderId() == request.getLeaderId()) { + consume.setConsumePosition(request.getConsumePositions()); + response.setSuccess(true); + } else { + logger.warn("Set consume info fail, topic: {}, group: {}, term: {}, leaderId: {}, currentTerm: {}, currentLeaderId: {}", + request.getTopic(), request.getGroup(), request.getTerm(), request.getLeaderId(), raftLeaderElection.getCurrentTerm(), raftLeaderElection.getLeaderId()); + response.setSuccess(false); + } + } + @Override public int type() { return CommandType.REPLICATE_CONSUME_POS_REQUEST; diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/election/network/codec/ReplicateConsumePosRequestDecoder.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/election/network/codec/ReplicateConsumePosRequestDecoder.java index 78afcf67d..46d87edd6 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/election/network/codec/ReplicateConsumePosRequestDecoder.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/election/network/codec/ReplicateConsumePosRequestDecoder.java @@ -17,6 +17,7 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.TypeReference; +import io.netty.buffer.ByteBuf; import org.joyqueue.broker.consumer.model.ConsumePartition; import org.joyqueue.broker.consumer.position.model.Position; import org.joyqueue.broker.election.command.ReplicateConsumePosRequest; @@ -25,7 +26,6 @@ import org.joyqueue.network.transport.codec.JoyQueueHeader; import org.joyqueue.network.transport.codec.PayloadDecoder; import org.joyqueue.network.transport.command.Type; -import io.netty.buffer.ByteBuf; import java.util.Map; @@ -38,19 +38,37 @@ public class ReplicateConsumePosRequestDecoder implements PayloadDecoder, Type { @Override public Object decode(final JoyQueueHeader header, final ByteBuf buffer) throws Exception { - String consumePositions; + String consumePositionString; + Map consumePositionsMap = null; + int term = 0; + int leaderId = 0; + String topic = null; + int group = 0; + if (header.getVersion() == JoyQueueHeader.VERSION_V1) { - consumePositions = Serializer.readString(buffer, Serializer.SHORT_SIZE); + consumePositionString = Serializer.readString(buffer, Serializer.SHORT_SIZE); } else { - consumePositions = Serializer.readString(buffer, Serializer.INT_SIZE); + consumePositionString = Serializer.readString(buffer, Serializer.INT_SIZE); } - if (consumePositions != null) { - Map connections = JSON.parseObject(consumePositions, new TypeReference>() { - }); - return new ReplicateConsumePosRequest(connections); + if (consumePositionString != null) { + consumePositionsMap = JSON.parseObject(consumePositionString, new TypeReference>() {}); } - return new ReplicateConsumePosRequest(); + + if (header.getVersion() >= JoyQueueHeader.VERSION_V4) { + term = buffer.readInt(); + leaderId = buffer.readInt(); + topic = Serializer.readString(buffer); + group = buffer.readInt(); + } + + ReplicateConsumePosRequest replicateConsumePosRequest = new ReplicateConsumePosRequest(); + replicateConsumePosRequest.setConsumePositions(consumePositionsMap); + replicateConsumePosRequest.setLeaderId(leaderId); + replicateConsumePosRequest.setTerm(term); + replicateConsumePosRequest.setTopic(topic); + replicateConsumePosRequest.setGroup(group); + return replicateConsumePosRequest; } @Override diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/election/network/codec/ReplicateConsumePosRequestEncoder.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/election/network/codec/ReplicateConsumePosRequestEncoder.java index 1709739d2..fe8ded32a 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/election/network/codec/ReplicateConsumePosRequestEncoder.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/election/network/codec/ReplicateConsumePosRequestEncoder.java @@ -16,6 +16,7 @@ package org.joyqueue.broker.election.network.codec; import com.alibaba.fastjson.JSON; +import io.netty.buffer.ByteBuf; import org.joyqueue.broker.consumer.model.ConsumePartition; import org.joyqueue.broker.consumer.position.model.Position; import org.joyqueue.broker.election.command.ReplicateConsumePosRequest; @@ -24,7 +25,6 @@ import org.joyqueue.network.transport.codec.JoyQueueHeader; import org.joyqueue.network.transport.codec.PayloadEncoder; import org.joyqueue.network.transport.command.Type; -import io.netty.buffer.ByteBuf; import java.util.Map; @@ -48,6 +48,13 @@ public void encode(final ReplicateConsumePosRequest request, ByteBuf buffer) thr } else { Serializer.write(JSON.toJSONString(request.getConsumePositions()), buffer, bodyLength); } + + if (request.getHeader().getVersion() >= JoyQueueHeader.VERSION_V4) { + buffer.writeInt(request.getTerm()); + buffer.writeInt(request.getLeaderId()); + Serializer.write(request.getTopic(), buffer); + buffer.writeInt(request.getGroup()); + } } @Override diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/manage/service/ElectionManageService.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/manage/service/ElectionManageService.java index f8bfdf8ef..651c6d9b1 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/manage/service/ElectionManageService.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/manage/service/ElectionManageService.java @@ -45,4 +45,13 @@ public interface ElectionManageService { * @param term 轮次 */ void updateTerm(String topic, int partitionGroup, int term); + + /** + * 添加复制任务 + * @param topic + * @param partitionGroup + * @param replicaId + * @return + */ + boolean addReplicaTask(String topic, int partitionGroup, int replicaId); } diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/manage/service/support/DefaultBrokerManageService.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/manage/service/support/DefaultBrokerManageService.java index 12df4332b..ea2ce6530 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/manage/service/support/DefaultBrokerManageService.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/manage/service/support/DefaultBrokerManageService.java @@ -253,6 +253,11 @@ public void updateTerm(String topic, int partitionGroup, int term) { electionManageService.updateTerm(topic, partitionGroup, term); } + @Override + public boolean addReplicaTask(String topic, int partitionGroup, int replicaId) { + return electionManageService.addReplicaTask(topic, partitionGroup, replicaId); + } + @Override public Directory storeTreeView(boolean recursive) { return storeManageService.storeTreeView(recursive); diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/manage/service/support/DefaultElectionManageService.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/manage/service/support/DefaultElectionManageService.java index addc0a88c..e77b0c149 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/manage/service/support/DefaultElectionManageService.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/manage/service/support/DefaultElectionManageService.java @@ -16,7 +16,10 @@ package org.joyqueue.broker.manage.service.support; import org.joyqueue.broker.election.ElectionService; +import org.joyqueue.broker.election.LeaderElection; +import org.joyqueue.broker.election.RaftLeaderElection; import org.joyqueue.broker.manage.service.ElectionManageService; +import org.joyqueue.domain.TopicName; public class DefaultElectionManageService implements ElectionManageService { private ElectionService electionService; @@ -44,4 +47,13 @@ public String describeTopic(String topic, int partitionGroup) { public void updateTerm(String topic, int partitionGroup, int term) { electionService.updateTerm(topic, partitionGroup, term); } + + @Override + public boolean addReplicaTask(String topic, int partitionGroup, int replicaId) { + LeaderElection leaderElection = electionService.getLeaderElection(TopicName.parse(topic), partitionGroup); + if (leaderElection == null || !(leaderElection instanceof RaftLeaderElection)) { + return false; + } + return leaderElection.getReplicaGroup().addReplicaTask(replicaId); + } } diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/replication/ReplicaGroup.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/replication/ReplicaGroup.java index 1a51fd3c1..d1b37bb7e 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/replication/ReplicaGroup.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/replication/ReplicaGroup.java @@ -227,7 +227,7 @@ public synchronized void removeNode(int nodeId) { * @param replicaId 副本id * @return replica */ - private Replica getReplica(int replicaId) { + public Replica getReplica(int replicaId) { return replicas.stream() .filter(r -> r.replicaId() == replicaId) .findFirst() @@ -254,6 +254,19 @@ public boolean isLeader(){ return this.state == ElectionNode.State.LEADER; } + public synchronized boolean addReplicaTask(int replicaId) { + if (!isLeader()) { + logger.error("add replica task error, not leader, partition: {}", topicPartitionGroup); + return false; + } + if (getReplica(replicaId) == null) { + logger.error("add replica task error, replica not exist, partition: {}", topicPartitionGroup); + return false; + } + replicateResponseQueue.put(new DelayedCommand(ONE_SECOND_NANO, replicaId)); + return true; + } + /** * 是否需要复制,kafka的coordinators不需要复制 * @return if topic need replicate @@ -401,6 +414,7 @@ private void replicateLocal() { delayTimeNs = ONE_SECOND_NANO; } + getReplica(localReplicaId).writePosition(replicableStore.rightPosition()); replicateResponseQueue.put(new DelayedCommand(delayTimeNs, localReplicaId)); } @@ -416,12 +430,9 @@ private void replicateMessage(Replica replica) { AppendEntriesRequest request = generateAppendEntriesRequest(replica); if (request == null) { - if (!electionConfig.enableSharedHeartbeat()) { - if (SystemClock.now() - replica.getLastAppendTime() >= electionConfig.getHeartbeatTimeout()) { - request = generateHeartbeatRequest(replica); - } - } - if (request == null) { + if (SystemClock.now() - replica.getLastAppendTime() >= electionConfig.getElectionTimeout()) { + request = generateHeartbeatRequest(replica); + } else { replicateResponseQueue.put(new DelayedCommand(ONE_MS_NANO, replica.replicaId())); return; } @@ -566,7 +577,7 @@ public void onSuccess(Command request, Command response) { 1, entriesLength, usTime() - startTimeUs); } catch (Exception e) { - logger.info("Partition group {}/node {} process append entries reponse fail", + logger.info("Partition group {}/node {} process append entries response fail", topicPartitionGroup, localReplicaId, e); } finally { replicateResponseQueue.put(new DelayedCommand(0, replica.replicaId())); @@ -660,9 +671,18 @@ private void maybeReplicateConsumePos(Replica replica) { return; } - ReplicateConsumePosRequest request = new ReplicateConsumePosRequest(consumePositions); + ReplicateConsumePosRequest request = new ReplicateConsumePosRequest(); + request.setConsumePositions(consumePositions); + request.setTerm(currentTerm); + request.setLeaderId(leaderId); + request.setTopic(topicPartitionGroup.getTopic()); + request.setGroup(topicPartitionGroup.getPartitionGroupId()); JoyQueueHeader header = new JoyQueueHeader(Direction.REQUEST, CommandType.REPLICATE_CONSUME_POS_REQUEST); + if (electionConfig.enableReplicatePositionV3Protocol()) { + header.setVersion(JoyQueueHeader.VERSION_V3); + } + if (logger.isDebugEnabled() || electionConfig.getOutputConsumePos()) { logger.debug("Partition group {}/node {} send consume position {} to node {}", topicPartitionGroup, localReplicaId, consumePositions, replica.replicaId()); diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/replication/ReplicationManager.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/replication/ReplicationManager.java index d0979c9aa..9f7457b7d 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/replication/ReplicationManager.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/replication/ReplicationManager.java @@ -88,6 +88,7 @@ public void doStart() throws Exception { ClientConfig clientConfig = new ClientConfig(); clientConfig.setIoThreadName("joyqueue-Replication-IO-EventLoop"); clientConfig.setMaxAsync(1000); + clientConfig.setFastfailAsync(true); clientConfig.setIoThread(32); clientConfig.setSocketBufferSize(1024 * 1024 * 1); clientConfig.setConnectionTimeout(300 * 1); diff --git a/joyqueue-server/joyqueue-broker-core/src/main/resources/manage/routing.xml b/joyqueue-server/joyqueue-broker-core/src/main/resources/manage/routing.xml index 4cf24d561..e969b4fb3 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/resources/manage/routing.xml +++ b/joyqueue-server/joyqueue-broker-core/src/main/resources/manage/routing.xml @@ -258,5 +258,7 @@ handlers="brokerManageService.describeTopic"/> + \ No newline at end of file diff --git a/joyqueue-server/joyqueue-broker-core/src/test/java/org/joyqueue/broker/election/ElectionCommandCodecTest.java b/joyqueue-server/joyqueue-broker-core/src/test/java/org/joyqueue/broker/election/ElectionCommandCodecTest.java index c00cbff4a..0dcd09e46 100644 --- a/joyqueue-server/joyqueue-broker-core/src/test/java/org/joyqueue/broker/election/ElectionCommandCodecTest.java +++ b/joyqueue-server/joyqueue-broker-core/src/test/java/org/joyqueue/broker/election/ElectionCommandCodecTest.java @@ -15,9 +15,10 @@ */ package org.joyqueue.broker.election; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import org.joyqueue.broker.consumer.model.ConsumePartition; import org.joyqueue.broker.consumer.position.model.Position; -import org.joyqueue.broker.election.TopicPartitionGroup; import org.joyqueue.broker.election.command.AppendEntriesRequest; import org.joyqueue.broker.election.command.AppendEntriesResponse; import org.joyqueue.broker.election.command.ReplicateConsumePosRequest; @@ -43,8 +44,6 @@ import org.joyqueue.broker.election.network.codec.VoteResponseDecoder; import org.joyqueue.broker.election.network.codec.VoteResponseEncoder; import org.joyqueue.network.transport.codec.JoyQueueHeader; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import org.junit.Assert; import org.junit.Test; @@ -162,7 +161,8 @@ public void testAppendEntriesResponseCodec() throws Exception { public void testReplicateConsumePosRequestCodec() throws Exception { Map consumePositions = new HashMap<>(); consumePositions.put(new ConsumePartition("logbook18-HT", "logbookApi.lgbk18", Short.valueOf("0")), new Position(-1,-1,-1,-1)); - ReplicateConsumePosRequest request = new ReplicateConsumePosRequest(consumePositions); + ReplicateConsumePosRequest request = new ReplicateConsumePosRequest(); + request.setConsumePositions(consumePositions); request.setHeader(new JoyQueueHeader()); ReplicateConsumePosRequestEncoder encoder = new ReplicateConsumePosRequestEncoder(); diff --git a/pom.xml b/pom.xml index 616faacfb..e7302f7e7 100644 --- a/pom.xml +++ b/pom.xml @@ -31,16 +31,6 @@ Joyqueue Community http://www.joyqueue.org - - - Rudy Steiner - rudy_steiner@163.com - - - Li Yue - liyue2008@gmail.com - - Apache License, Version 2.0