Skip to content

Commit

Permalink
Merge pull request #293 from llIlll/fix-consume
Browse files Browse the repository at this point in the history
修复并行消费和flag过滤bug
  • Loading branch information
llIlll committed Aug 25, 2020
2 parents 6222319 + c1e25c8 commit f146638
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -526,22 +526,22 @@ private PullResult readMessages(Consumer consumer, short partition, long index,
int partitionGroup = clusterManager.getPartitionGroupId(TopicName.parse(consumer.getTopic()), partition);
PartitionGroupStore store = storeService.getStore(consumer.getTopic(), partitionGroup);
ReadResult readRst = store.read(partition, index, count, Long.MAX_VALUE);
if (readRst.getCode() == JoyQueueCode.SUCCESS) {
List<ByteBuffer> byteBufferList = Lists.newArrayList(readRst.getMessages());
org.joyqueue.domain.Consumer consumerConfig = clusterManager.getConsumer(TopicName.parse(consumer.getTopic()), consumer.getApp());

if (consumerConfig != null) {
// 过滤消息
List<ByteBuffer> byteBuffers = filterMessageSupport.filter(consumerConfig, byteBufferList, new FilterCallbackImpl(consumer));

// 开启延迟消费,过滤未到消费时间的消息
byteBuffers = delayHandler.handle(consumerConfig.getConsumerPolicy(), byteBuffers);
// 构建拉取结果
pullResult = new PullResult(consumer, partition, byteBuffers);
}
} else {
logger.error("read message error, error code[{}]", readRst.getCode());
}
// if (readRst.getCode() == JoyQueueCode.SUCCESS) {
// List<ByteBuffer> byteBufferList = Lists.newArrayList(readRst.getMessages());
// org.joyqueue.domain.Consumer consumerConfig = clusterManager.getConsumer(TopicName.parse(consumer.getTopic()), consumer.getApp());
//
// if (consumerConfig != null) {
// // 过滤消息
// List<ByteBuffer> byteBuffers = filterMessageSupport.filter(consumerConfig, byteBufferList, new FilterCallbackImpl(consumer));
//
// // 开启延迟消费,过滤未到消费时间的消息
// byteBuffers = delayHandler.handle(consumerConfig.getConsumerPolicy(), byteBuffers);
// // 构建拉取结果
// pullResult = new PullResult(consumer, partition, byteBuffers);
// }
// } else {
// logger.error("read message error, error code[{}]", readRst.getCode());
// }
} catch (PositionOverflowException e) {
if(e.getRight() < index) {
pullResult.setCode(JoyQueueCode.SE_INDEX_OVERFLOW);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.broker.archive.ArchiveManager;
import org.joyqueue.broker.archive.ConsumeArchiveService;
import org.joyqueue.broker.buffer.Serializer;
Expand Down Expand Up @@ -220,19 +219,19 @@ protected PullResult getMsgByPartitionAndIndex(Consumer consumer, int group, sho
}

List<ByteBuffer> byteBuffers = readResult.getBuffers();
if (StringUtils.isNotEmpty(consumer.getApp()) &&
(!Consumer.ConsumeType.INTERNAL.equals(consumer.getType()) && !Consumer.ConsumeType.KAFKA.equals(consumer.getType()))) {

org.joyqueue.domain.Consumer consumerConfig = clusterManager.tryGetConsumer(TopicName.parse(consumer.getTopic()), consumer.getApp());

if (consumerConfig != null) {
// 过滤消息
byteBuffers = filterMessageSupport.filter(consumerConfig, byteBuffers, new FilterCallbackImpl(consumer));

// 开启延迟消费,过滤未到消费时间的消息
byteBuffers = delayHandler.handle(consumerConfig.getConsumerPolicy(), byteBuffers);
}
}
// if (StringUtils.isNotEmpty(consumer.getApp()) &&
// (!Consumer.ConsumeType.INTERNAL.equals(consumer.getType()) && !Consumer.ConsumeType.KAFKA.equals(consumer.getType()))) {
//
// org.joyqueue.domain.Consumer consumerConfig = clusterManager.tryGetConsumer(TopicName.parse(consumer.getTopic()), consumer.getApp());
//
// if (consumerConfig != null) {
// // 过滤消息
// byteBuffers = filterMessageSupport.filter(consumerConfig, byteBuffers, new FilterCallbackImpl(consumer));
//
// // 开启延迟消费,过滤未到消费时间的消息
// byteBuffers = delayHandler.handle(consumerConfig.getConsumerPolicy(), byteBuffers);
// }
// }

pullResult = new PullResult(consumer, partition, byteBuffers);
} catch (PositionOverflowException overflow) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
Expand Down Expand Up @@ -552,24 +557,27 @@ private long getPullIndex(Consumer consumer, short partition) throws JoyQueueExc
private ReadMessagesResult readMessages(Consumer consumer, short partition, long index, int count) {
// 初始化默认
ReadMessagesResult readMessagesResult = new ReadMessagesResult();
PullResult pullResult = new PullResult(consumer, (short) -1, new ArrayList<>(0));
PullResult pullResult = new PullResult(consumer, partition, new ArrayList<>(0));
try {
int partitionGroup = clusterManager.getPartitionGroupId(TopicName.parse(consumer.getTopic()), partition);
PartitionGroupStore store = storeService.getStore(consumer.getTopic(), partitionGroup);
ReadResult readRst = store.read(partition, index, count, Long.MAX_VALUE);
if (readRst.getCode() == JoyQueueCode.SUCCESS) {
List<ByteBuffer> byteBufferList = Lists.newArrayList(readRst.getMessages());
org.joyqueue.domain.Consumer consumerConfig = clusterManager.getConsumer(TopicName.parse(consumer.getTopic()), consumer.getApp());

if (consumerConfig != null) {
// 过滤消息
List<ByteBuffer> byteBuffers = filterMessageSupport.filter(consumerConfig, byteBufferList, readMessagesResult::setFilteredMessages);

// 开启延迟消费,过滤未到消费时间的消息
byteBuffers = delayHandler.handle(consumerConfig.getConsumerPolicy(), byteBuffers);
// 构建拉取结果
pullResult = new PullResult(consumer, partition, byteBuffers);
if (readRst.getMessages() != null) {
pullResult.setBuffers(Lists.newArrayList(readRst.getMessages()));
}
// List<ByteBuffer> byteBufferList = Lists.newArrayList(readRst.getMessages());
// org.joyqueue.domain.Consumer consumerConfig = clusterManager.getConsumer(TopicName.parse(consumer.getTopic()), consumer.getApp());
//
// if (consumerConfig != null) {
// // 过滤消息
// List<ByteBuffer> byteBuffers = filterMessageSupport.filter(consumerConfig, byteBufferList, readMessagesResult::setFilteredMessages);
//
// // 开启延迟消费,过滤未到消费时间的消息
// byteBuffers = delayHandler.handle(consumerConfig.getConsumerPolicy(), byteBuffers);
// // 构建拉取结果
// pullResult = new PullResult(consumer, partition, byteBuffers);
// }
} else {
logger.error("read message error, error code[{}]", readRst.getCode());
}
Expand Down Expand Up @@ -774,25 +782,14 @@ boolean ack(TopicName topic, String app, short partition, long startIndex, int c
// 如果确认的片段是滑动窗口的第一段,需要在分区上ack,并向尾部缩小滑动窗口

while (!consumedMessagesMap.isEmpty() && (consumedMessages = consumedMessagesMap.firstEntry().getValue()).isAcked()) {
long lastMsgAckIndex = positionManager.getLastMsgAckIndex(topic, app, partition);
consumedMessagesMap.remove(consumedMessages.getStartIndex());
if (lastMsgAckIndex >= consumedMessages.getStartIndex() && lastMsgAckIndex < consumedMessages.getStartIndex() + consumedMessages.getCount()) {
positionManager.updateLastMsgAckIndex(topic, app, partition,
consumedMessages.getStartIndex() + consumedMessages.getCount(), false);
} else {
logger.warn("Ack index not match, topic: {}, partition: {}, ack: [{} - {}), currentAckIndex: {}!",
topic.getFullName(),
partition,
Format.formatWithComma(consumedMessages.getStartIndex()),
Format.formatWithComma(consumedMessages.getStartIndex() + consumedMessages.getCount()),
Format.formatWithComma(lastMsgAckIndex)
);
}
positionManager.updateLastMsgAckIndex(topic, app, partition,
consumedMessages.getStartIndex() + consumedMessages.getCount(), false);
}
ret = true;
}
if(!ret) {
logger.warn("Concurrent cunsume ack failed, topic: {}, partition: {}, ack: [{} - {}), currentAckIndex: {}.",
logger.warn("Concurrent consume ack failed, topic: {}, partition: {}, ack: [{} - {}), currentAckIndex: {}.",
topic.getFullName(),
partition,
Format.formatWithComma(startIndex),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@
*/
package org.joyqueue.broker.consumer.filter;

import com.google.common.collect.Lists;
import com.jd.laf.extension.Extension;
import org.joyqueue.broker.buffer.Serializer;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.exception.JoyQueueException;
import org.joyqueue.message.BrokerMessage;
import com.jd.laf.extension.Extension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -53,7 +52,7 @@ public void setRule(String rule) {
public List<ByteBuffer> filter(List<ByteBuffer> byteBufferList, FilterCallback filterCallback) throws JoyQueueException {
FilterResult filterResult = doFilter(byteBufferList, pattern);
List<ByteBuffer> inValidList = filterResult.getInValidList();
if (null != filterCallback) {
if (inValidList != null && !inValidList.isEmpty() && filterCallback != null) {
filterCallback.callback(inValidList);
}
return filterResult.getValidList();
Expand All @@ -73,14 +72,13 @@ public List<ByteBuffer> filter(List<ByteBuffer> byteBufferList, FilterCallback f
* @return
*/
private FilterResult doFilter(List<ByteBuffer> messages, Pattern pattern) throws JoyQueueException {
List<ByteBuffer> validList = new ArrayList<>(); // 有效队列
List<ByteBuffer> inValidList = null; // 无效队列
List<ByteBuffer> validList = Lists.newLinkedList(); // 有效队列
List<ByteBuffer> inValidList = Lists.newLinkedList(); // 无效队列
boolean /* 有效到无效 */ valid2InvalidFlag = false,
/* 无效到有效 */ invaild2ValidFlag = false;
/* 无效到有效 */ invalid2ValidFlag = false;

for (int i = 0; i < messages.size(); i++) {
ByteBuffer buffer = messages.get(i);
BrokerMessage message = null;
short flag;
try {
flag = Serializer.readFlag(buffer);
Expand All @@ -90,30 +88,30 @@ private FilterResult doFilter(List<ByteBuffer> messages, Pattern pattern) throws
}

// 是否匹配
boolean matcher = pattern.matcher("" + flag).matches();

if (i == 0 && !matcher) {
// 不是有效标签开头
inValidList = new ArrayList<>();
}
boolean isMatch = pattern.matcher("" + flag).matches();

if (matcher && !valid2InvalidFlag) {
if (isMatch) {
if (i == 0) {
valid2InvalidFlag = true;
}
validList.add(buffer);
if (inValidList != null && inValidList.size() > 0) {
invaild2ValidFlag = true;
if (invalid2ValidFlag) {
break;
}
} else if (inValidList != null && !invaild2ValidFlag) {
inValidList.add(buffer);
if (validList.size() > 0) {
valid2InvalidFlag = true;
} else {
if (i == 0) {
invalid2ValidFlag = true;
}
if (valid2InvalidFlag) {
break;
}
inValidList.add(buffer);
}
}

return new FilterResult(validList, inValidList);
}


/**
* 过滤结果
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package org.joyqueue.broker.consumer;

import org.joyqueue.broker.cluster.ClusterManager;
import org.joyqueue.broker.consumer.FilterMessageSupport;
import org.joyqueue.broker.consumer.filter.FilterCallback;
import org.joyqueue.domain.Consumer;
import org.joyqueue.exception.JoyQueueException;
Expand Down Expand Up @@ -69,6 +68,6 @@ public void callback(List<ByteBuffer> list) throws JoyQueueException {
}
});

Assert.assertEquals(2, filter.size());
Assert.assertEquals(1, filter.size());
}
}
Loading

0 comments on commit f146638

Please sign in to comment.