Skip to content

Commit

Permalink
修复并行消费问题
Browse files Browse the repository at this point in the history
  • Loading branch information
gaohaoxiang committed Sep 3, 2020
1 parent b2f207d commit 439d035
Showing 1 changed file with 14 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,10 @@ public PullResult getMessage(Consumer consumer, int count, long ackTimeout, long
}
if (pullResult == null || pullResult.isEmpty()) {
List<Short> partitionList = clusterManager.getLocalPartitions(TopicName.parse(consumer.getTopic()));
// if (partitionManager.isRetry(consumer)) {
// partitionList = new ArrayList<>(partitionList);
// partitionList.add(Partition.RETRY_PARTITION_ID);
// }
if (partitionManager.isRetry(consumer)) {
partitionList = new ArrayList<>(partitionList);
partitionList.add(Partition.RETRY_PARTITION_ID);
}
pullResult = getFromPartition(consumer, partitionList, count, ackTimeout, accessTimes, concurrent);
}
return pullResult;
Expand Down Expand Up @@ -348,15 +348,6 @@ private PullResult getFromPartition(Consumer consumer, List<Short> partitionList
}
listIndex = partitionManager.selectPartitionIndex(partitionSize, listIndex + i, accessTimes);
short partition = partitionList.get(listIndex);
if(partition==Partition.RETRY_PARTITION_ID){
// Try read once message from retry queue
pullResult = getRetryMessages(consumer, (short) 1);
if(!pullResult.isEmpty()){
break;
}
// 重试队列没有消息,继续轮询剩余队列
continue;
}
ConsumePartition consumePartition = new ConsumePartition(consumer.getTopic(), consumer.getApp(), partition);
SlideWindow slideWindow = slideWindowMap.computeIfAbsent(consumePartition,
k -> {
Expand All @@ -378,12 +369,21 @@ private PullResult getFromPartition(Consumer consumer, List<Short> partitionList
}

//超过并行度取下一个Partition
if(slideWindow.concurrentCount() >= concurrent) {
if (slideWindow.concurrentCount() >= concurrent) {
continue;
}

if(slideWindow.getAppendLock().tryLock()) {
try {
if (partition == Partition.RETRY_PARTITION_ID) {
pullResult = getRetryMessages(consumer, (short) 1);
if (pullResult != null && !pullResult.isEmpty()) {
return pullResult;
} else {
continue;
}
}

// 获取消息拉取位置
long pullIndex = getPullIndex(consumer, partition);

Expand Down

0 comments on commit 439d035

Please sign in to comment.