Skip to content

Commit

Permalink
优化raft相关 (#326)
Browse files Browse the repository at this point in the history
* 优化故障时心跳处理
* 复制消费位置改为raft判断
* 修复commitPosition判断问题
* transport添加快速失败,避免阻塞线程
* 添加复制任务补偿
* 补充日志
  • Loading branch information
llIlll committed Dec 28, 2020
1 parent d3ad5de commit b6b2136
Show file tree
Hide file tree
Showing 23 changed files with 255 additions and 82 deletions.
6 changes: 0 additions & 6 deletions MAINTAINERS.md

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,17 @@ public int hashCode() {
return Objects.hash(name, partitions, type, priorityPartitions, policy);
}

@Override
public String toString() {
return "Topic{" +
"name=" + name +
", partitions=" + partitions +
", type=" + type +
", priorityPartitions=" + priorityPartitions +
", policy=" + policy +
'}';
}

public static class TopicPolicy implements Serializable {
private Long storeMaxTime;
private Boolean storeCleanKeepUnconsumed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,14 @@ public void acquire(final SemaphoreType type, final long timeout) throws Transpo
}
}

public boolean tryAcquire(final SemaphoreType type) throws TransportException {
if (type == null) {
return false;
}
Semaphore semaphore = type == SemaphoreType.ASYNC ? asyncSemaphore : onewaySemaphore;
return semaphore.tryAcquire();
}

/**
* 释放信号量
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,7 @@ public void encode(Object obj, ByteBuf buffer) throws TransportException.CodecEx
if (payload instanceof JoyQueuePayload) {
((JoyQueuePayload) payload).setHeader(header);
}

int oldVersion = header.getVersion();
if (payload.getClass().getName().equals("org.joyqueue.nsr.network.command.CreatePartitionGroup")
|| payload.getClass().getName().equals("org.joyqueue.nsr.network.command.RemovePartitionGroup")
|| payload.getClass().getName().equals("org.joyqueue.nsr.network.command.UpdatePartitionGroup")) {
header.setVersion(JoyQueueHeader.VERSION_V2);
oldVersion = JoyQueueHeader.VERSION_V2;
} else {
header.setVersion(JoyQueueHeader.CURRENT_VERSION);
}
headerCodec.encode(header, buffer);

header.setVersion(oldVersion);
encoder.encode((Payload) payload, buffer);
} else {
header.setVersion(JoyQueueHeader.CURRENT_VERSION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ public class JoyQueueHeader implements Header {
public static final byte VERSION_V1 = 1;
public static final byte VERSION_V2 = 2;
public static final byte VERSION_V3 = 3;
public static final byte VERSION_V4 = 4;

public static final byte CURRENT_VERSION = VERSION_V3;
public static final byte CURRENT_VERSION = VERSION_V4;

public static final int MAGIC = 0xCAFEBEBE;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public class TransportConfig {
private boolean nonBlockOneway = false;
// 非阻塞异步
private boolean nonBlockAsync = false;
// 快速失败
private boolean fastfailAsync = false;
// 最大异步请求数
private int maxAsync = 10240;
// 异步回调线程数量
Expand Down Expand Up @@ -281,6 +283,14 @@ public boolean isNonBlockOneway() {
return nonBlockOneway;
}

public boolean isFastfailAsync() {
return fastfailAsync;
}

public void setFastfailAsync(boolean fastfailAsync) {
this.fastfailAsync = fastfailAsync;
}

public void setNonBlockAsync(boolean nonBlockAsync) {
this.nonBlockAsync = nonBlockAsync;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ public static final TransportConfig buildTransportConfig(final PropertySupplier
transportConfig.setNonBlockOneway(property.getBoolean());
} else if (fullKey.equals(keyPrefix + TRANSPORT_CALL_NON_BLOCK_ASYNC)) {
transportConfig.setNonBlockAsync(property.getBoolean());
} else if (fullKey.equals(keyPrefix + TRANSPORT_CALL_FAST_FAIL_ASYNC)) {
transportConfig.setFastfailAsync(property.getBoolean());
} else if (fullKey.equals(keyPrefix + TRANSPORT_SEND_TIMEOUT)) {
transportConfig.setSendTimeout(property.getInteger());
} else if (fullKey.equals(keyPrefix + TRANSPORT_RETRY_DELAY)) {
Expand Down Expand Up @@ -132,6 +134,7 @@ public static final TransportConfig buildTransportConfig(final PropertySupplier
public static final String TRANSPORT_CALL_BACK_THREADS = "transport.callbackThreads";
public static final String TRANSPORT_CALL_NON_BLOCK_ONEWAY = "transport.nonBlockOneway";
public static final String TRANSPORT_CALL_NON_BLOCK_ASYNC = "transport.nonBlockAsync";
public static final String TRANSPORT_CALL_FAST_FAIL_ASYNC = "transport.fastfailAsync";
public static final String TRANSPORT_SEND_TIMEOUT = "transport.sendTimeout";
public static final String TRANSPORT_RETRY_DELAY = "transport.retryDelay";
public static final String TRANSPORT_RETRY_MAX = "transport.retryMax";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package org.joyqueue.network.transport.support;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.joyqueue.domain.QosLevel;
import org.joyqueue.network.transport.ChannelTransport;
import org.joyqueue.network.transport.RequestBarrier;
Expand All @@ -30,9 +33,6 @@
import org.joyqueue.network.transport.exception.TransportException;
import org.joyqueue.toolkit.network.IpUtil;
import org.joyqueue.toolkit.time.SystemClock;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -151,7 +151,19 @@ public void async(final Command command, final long timeout, CommandCallback cal
long sendTimeout = timeout <= 0 ? barrier.getSendTimeout() : timeout;
// 获取信号量
if (!config.isNonBlockAsync()) {
barrier.acquire(RequestBarrier.SemaphoreType.ASYNC, sendTimeout);
if (config.isFastfailAsync()) {
if (!barrier.tryAcquire(RequestBarrier.SemaphoreType.ASYNC)) {
callback.onException(command, TransportException.RequestExcessiveException.build());
return;
}
} else {
try {
barrier.acquire(RequestBarrier.SemaphoreType.ASYNC, timeout);
} catch (Exception e) {
callback.onException(command, e);
return;
}
}
}

try {
Expand Down Expand Up @@ -217,7 +229,13 @@ public void oneway(final Command command, final long timeout) throws TransportEx
long time = SystemClock.now();
// 获取信号量
if (!config.isNonBlockOneway()) {
barrier.acquire(RequestBarrier.SemaphoreType.ONEWAY, sendTimeout);
if (config.isFastfailAsync()) {
if (!barrier.tryAcquire(RequestBarrier.SemaphoreType.ONEWAY)) {
throw TransportException.RequestExcessiveException.build();
}
} else {
barrier.acquire(RequestBarrier.SemaphoreType.ONEWAY, timeout);
}
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ public int getHeartbeatTimeout() {
return PropertySupplier.getValue(propertySupplier, ElectionConfigKey.HEARTBEAT_TIMEOUT);
}

public int getHeartbeatMaxTimeout() {
return PropertySupplier.getValue(propertySupplier, ElectionConfigKey.HEARTBEAT_MAX_TIMEOUT);
}

public int getSendCommandTimeout() {
return PropertySupplier.getValue(propertySupplier, ElectionConfigKey.SEND_COMMAND_TIMEOUT);
}
Expand Down Expand Up @@ -161,6 +165,10 @@ public boolean enableSharedHeartbeat() {
return PropertySupplier.getValue(propertySupplier, ElectionConfigKey.ENABLE_SHARED_HEARTBEAT);
}

public boolean enableReplicatePositionV3Protocol() {
return PropertySupplier.getValue(propertySupplier, ElectionConfigKey.ENABLE_REPLICATE_POSITION_V3_PROTOCOL);
}

public void setListenPort(String port) {
listenPort = Integer.valueOf(port);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public enum ElectionConfigKey implements PropertyDef {
EXECUTOR_THREAD_NUM_MAX("election.executor.thread.num.max", 50, Type.INT),
TIMER_SCHEDULE_THREAD_NUM("election.timer.schedule.thread.num", 10, Type.INT),
HEARTBEAT_TIMEOUT("election.heartbeat.timeout", 1000, Type.INT),
HEARTBEAT_MAX_TIMEOUT("election.heartbeat.max.timeout", 1000 * 30, Type.INT),
SEND_COMMAND_TIMEOUT("election.send.command.timeout", 1000 * 5, Type.INT),
MAX_BATCH_REPLICATE_SIZE("election.max.replicate.length", 1024 * 1024 * 3, Type.INT),
DISABLE_STORE_TIMEOUT("election.disable.store.timeout", 1000 * 5, Type.INT),
Expand All @@ -50,6 +51,7 @@ public enum ElectionConfigKey implements PropertyDef {
CONNECTION_TIMEOUT("election.connection.timeout", 100 * 1, Type.INT),
CONNECTION_RETRY_DELAY("election.connection.retryDelay", 1000 * 10, Type.INT),
ENABLE_SHARED_HEARTBEAT("election.enable.shared.heartbeat", false, Type.BOOLEAN),
ENABLE_REPLICATE_POSITION_V3_PROTOCOL("election.enable.replicate.position.v3.protocol", false, Type.BOOLEAN),

;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.joyqueue.broker.election.command.TimeoutNowResponse;
import org.joyqueue.broker.election.command.VoteRequest;
import org.joyqueue.broker.election.command.VoteResponse;
import org.joyqueue.broker.replication.Replica;
import org.joyqueue.broker.replication.ReplicaGroup;
import org.joyqueue.domain.PartitionGroup;
import org.joyqueue.domain.TopicConfig;
Expand Down Expand Up @@ -153,7 +154,7 @@ protected void doStop() {

super.doStop();

logger.info("Raft leader election of partition group {}/node {} stoped",
logger.info("Raft leader election of partition group {}/node {} stopped",
topicPartitionGroup, localNode);
}

Expand Down Expand Up @@ -260,6 +261,10 @@ public void setLeaderId(int leaderId) throws Exception {
}
}

public int getCurrentTerm() {
return currentTerm;
}

/**
* 切换状态
* @param state 切换到的状态
Expand Down Expand Up @@ -784,7 +789,7 @@ public synchronized Command handleAppendEntriesRequest(AppendEntriesRequest requ
if (request.getTerm() < currentTerm) {
logger.info("Partition group {}/node {} receive append entries request from {}, current term {} " +
"is bigger than request term {}, length is {}",
topicPartitionGroup, localNode, currentTerm, request.getLeaderId(),
topicPartitionGroup, localNode, request.getLeaderId(), currentTerm,
request.getTerm(), request.getEntriesLength());
return new Command(new JoyQueueHeader(Direction.RESPONSE, CommandType.RAFT_APPEND_ENTRIES_RESPONSE),
new AppendEntriesResponse.Build().success(false).term(currentTerm)
Expand All @@ -807,14 +812,6 @@ public synchronized Command handleAppendEntriesRequest(AppendEntriesRequest requ
}
}

private synchronized void maybeStartNewHeartbeat() {
if (electionConfig.enableSharedHeartbeat()) {
startNewHeartbeat();
} else {
resetHeartbeatTimer();
}
}

/**
* 开始新一轮心跳,向Follower节点发送心跳命令,重置心跳定时器
*/
Expand All @@ -835,6 +832,15 @@ private synchronized void startNewHeartbeat() {
if (node.equals(localNode)) {
continue;
}

if (!electionConfig.enableSharedHeartbeat()) {
Replica replica = replicaGroup.getReplica(node.getNodeId());
if (replica != null && replica.getLastAppendTime() != 0
&& SystemClock.now() - replica.getLastAppendTime() > electionConfig.getHeartbeatMaxTimeout()) {
continue;
}
}

try {
electionExecutor.submit(() -> {
JoyQueueHeader header = new JoyQueueHeader(Direction.REQUEST, CommandType.RAFT_APPEND_ENTRIES_REQUEST);
Expand Down Expand Up @@ -927,7 +933,7 @@ private synchronized void resetHeartbeatTimer() {
heartbeatTimerFuture.cancel(true);
heartbeatTimerFuture = null;
}
heartbeatTimerFuture = electionTimerExecutor.schedule(this::maybeStartNewHeartbeat,
heartbeatTimerFuture = electionTimerExecutor.schedule(this::startNewHeartbeat,
electionConfig.getHeartbeatTimeout(), TimeUnit.MILLISECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
*/
public class ReplicateConsumePosRequest extends JoyQueuePayload {
private Map<ConsumePartition, Position> consumePositions;
private int term;
private int leaderId;
private String topic;
private int group;

public Map<ConsumePartition, Position> getConsumePositions() {
return consumePositions;
Expand All @@ -38,11 +42,36 @@ public void setConsumePositions(Map<ConsumePartition, Position> consumePositions
this.consumePositions = consumePositions;
}

public ReplicateConsumePosRequest(Map<ConsumePartition, Position> consumePositions) {
this.consumePositions = consumePositions;
public int getTerm() {
return term;
}

public void setTerm(int term) {
this.term = term;
}

public int getLeaderId() {
return leaderId;
}

public void setLeaderId(int leaderId) {
this.leaderId = leaderId;
}

public String getTopic() {
return topic;
}

public void setTopic(String topic) {
this.topic = topic;
}

public int getGroup() {
return group;
}

public ReplicateConsumePosRequest() {
public void setGroup(int group) {
this.group = group;
}

@Override
Expand Down

0 comments on commit b6b2136

Please sign in to comment.