Skip to content

Commit

Permalink
修复并行消费问题
Browse files Browse the repository at this point in the history
  • Loading branch information
gaohaoxiang committed Aug 31, 2020
1 parent ee17a79 commit ad2278f
Showing 1 changed file with 22 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,9 @@ private boolean extendSlideWindowAndUpdatePullIndex(
logger.debug("set new pull index:{}, topic:{}, app:{}, partition:{}", newPullIndex, consumer.getTopic(), consumer.getApp(), partition);
}
positionManager.updateLastMsgPullIndex(TopicName.parse(consumer.getTopic()), consumer.getApp(), partition, newPullIndex);
if (consumedMessages.isReset()) {
positionManager.updateLastMsgAckIndex(TopicName.parse(consumer.getTopic()), consumer.getApp(), partition, consumedMessages.getStartIndex(), false);
}
return true;
} else {
return false;
Expand Down Expand Up @@ -754,6 +757,7 @@ private ConsumedMessages tryFindFirstExpired(long ackTimeoutMs) {
}

ConsumedMessages appendUnsafe(String topic, short partition, long nextPullIndex, int count, long timeoutMs) {
boolean isReset = false;
if(this.nextPullIndex != nextPullIndex) { // 如果不相等,有可能是重置了消费位置,以新的消费位置为准
logger.warn("Reset concurrent consumer pull index from {} to {}, topic: {}, partition: {}.",
this.nextPullIndex,
Expand All @@ -762,8 +766,9 @@ ConsumedMessages appendUnsafe(String topic, short partition, long nextPullIndex,
partition);
consumedMessagesMap.clear();
this.nextPullIndex = nextPullIndex;
isReset = true;
}
ConsumedMessages consumedMessages = new ConsumedMessages(nextPullIndex, count, timeoutMs);
ConsumedMessages consumedMessages = new ConsumedMessages(nextPullIndex, count, timeoutMs, isReset);
this.nextPullIndex += count;
consumedMessagesMap.put(nextPullIndex, consumedMessages);
return consumedMessages;
Expand Down Expand Up @@ -794,22 +799,17 @@ boolean ack(TopicName topic, String app, short partition, long startIndex, int c
while (!consumedMessagesMap.isEmpty() && (consumedMessages = consumedMessagesMap.firstEntry().getValue()).isAcked()) {
long lastMsgAckIndex = positionManager.getLastMsgAckIndex(topic, app, partition);
consumedMessagesMap.remove(consumedMessages.getStartIndex());
if (!consumedMessages.isExpired()) {
if (lastMsgAckIndex >= consumedMessages.getStartIndex() && lastMsgAckIndex < consumedMessages.getStartIndex() + consumedMessages.getCount()) {
positionManager.updateLastMsgAckIndex(topic, app, partition,
consumedMessages.getStartIndex() + consumedMessages.getCount(), false);

// if (lastMsgAckIndex >= consumedMessages.getStartIndex() && lastMsgAckIndex < consumedMessages.getStartIndex() + consumedMessages.getCount()) {
// positionManager.updateLastMsgAckIndex(topic, app, partition,
// consumedMessages.getStartIndex() + consumedMessages.getCount(), false);
// } else {
// logger.warn("Ack index not match, topic: {}, partition: {}, ack: [{} - {}), currentAckIndex: {}!",
// topic.getFullName(),
// partition,
// Format.formatWithComma(consumedMessages.getStartIndex()),
// Format.formatWithComma(consumedMessages.getStartIndex() + consumedMessages.getCount()),
// Format.formatWithComma(lastMsgAckIndex)
// );
// }
} else {
logger.warn("Ack index not match, topic: {}, partition: {}, ack: [{} - {}), currentAckIndex: {}!",
topic.getFullName(),
partition,
Format.formatWithComma(consumedMessages.getStartIndex()),
Format.formatWithComma(consumedMessages.getStartIndex() + consumedMessages.getCount()),
Format.formatWithComma(lastMsgAckIndex)
);
}
}
ret = true;
Expand Down Expand Up @@ -837,14 +837,16 @@ private static class ConsumedMessages {
private final long startIndex;
private final int count;
private long expireTime;
private boolean reset;


// 这段消息的状态
private AtomicInteger status = new AtomicInteger(LOCKED);
ConsumedMessages(long startIndex, int count, long timeoutMs) {
ConsumedMessages(long startIndex, int count, long timeoutMs, boolean reset) {
this.startIndex = startIndex;
this.count = count;
this.expireTime = SystemClock.now() + timeoutMs;
this.reset = reset;
}

int getCount() {
Expand Down Expand Up @@ -882,5 +884,9 @@ boolean isExpired() {
boolean isAcked() {
return status.get() == ACKED;
}

public boolean isReset() {
return reset;
}
}
}

0 comments on commit ad2278f

Please sign in to comment.